Amazon RedshiftからAmazon Kinesisのストリームデータをニアリアルタイムにインジェストしてみた
Amazon RedshiftからAmazon Kinesis Data Streamのストリームデータをマテリアライズド・ビュー経由でニアリアルタイムにインジェストする機能が2022年2月からパブリック・プレビュー提供されています。
Amazon Redshift announces public preview of Streaming Ingestion for Kinesis Data Streams
Amazon Kinesis Data StreamのストリームデータをRedshiftから利用したい場合、従来は Amazon Kinesis Data Firehoseを経由して一度 S3 に出力し、S3 データを Redshift に COPY する必要があり、ストイームデータのリアルタイム性が損なわれていました。
本機能を利用すると、マテリアライズド・ビューの更新レイテンシーが発生するものの、ニアリアルタイムにKinesis Data Streamsのデータをインジェストできます。
やってみた
Kinesis Data StreamsにPUTされたレコードをRedshiftからSQLで参照するところまでを動作確認します。
1. Kinesis Data Stream の作成
ストリームデータを送信する Kinesis Data Stream を作成します。
このストリームには、以下の様なフォーマットのレコードを送信します。
{ "user": 2, "heartrate": 143, "power": 194, "cadence": 93, "timestamp": "2022-05-20 14:50:09" }
2. Redshift に Kinesis Data Streams用ポリシーを付与
RedshiftがKinesis Data Streamsにアクセスできるよう、Redshiftクラスターに、以下のポリシーを付与したIAMロールを適用します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }
3. Kinesis用外部スキーマの作成
Amazon Redshift Spectrumと同様に、Redshift外のデータベースである Kinesis Data Streams を参照するために、外部スキーマを作成します。
CREATE EXTERNAL SCHEMA schema_one FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';
IAM_ROLE
には、手順2 で適用した IAM ロールの ARN を設定します。
4. マテリアライズド・ビューの作成
Kinesisの基本情報とレコード(Data
)だけを取得するマテリアライズド・ビューを作成します。
CREATE MATERIALIZED VIEW view_foo AS SELECT approximatearrivaltimestamp, partitionkey, shardid, sequencenumber, JSON_PARSE(from_varbyte(Data, 'utf-8')) as Data FROM schema_one.YOUR_STREAM_NAME
YOUR_STREAM_NAME
には 手順1で作成したストリーム名を設定します。
5. マテリアライズド・ビューの更新
KinesisからRedshiftのマテリアライズド・ビューにデータを引っ張ってくるために、ビューをリフレッシュします。
REFRESH MATERIALIZED VIEW view_foo
初回リフレッシュ時には、ストリームに存在するすべてのデータを同期します(ストリームのチェックポイントはTRIM_HORIZON
)。以降は差分同期です。
6. マテリアライズド・ビューの参照
ストリームデータを覗いてみます。
select * from view_foo limit 5
approximatearrivaltimestamp | partitionkey | shardid | sequencenumber | data |
---|---|---|---|---|
2022-05-20 14:50:10 | 881de89a8c61475a82ecb424f373effc | shardId-000000000000 | 49629688051866322531082401886402786968421632977418584066 | {"user":2,"heartrate":143,"power":194,"cadence":93,"timestamp":"2022-05-20 14:50:09"} |
2022-05-20 14:50:10 | 0c9a133651a846fe87bd1680d45b286c | shardId-000000000000 | 49629688051866322531082401886403995894241247606593290242 | {"user":1,"heartrate":146,"power":218,"cadence":97,"timestamp":"2022-05-20 14:50:10"} |
2022-05-20 14:50:12 | 45ec6b0130c34bd4afc13a4e9b3c4972 | shardId-000000000000 | 49629688051866322531082401886407622671700091562836885506 | {"user":1,"heartrate":139,"power":209,"cadence":93,"timestamp":"2022-05-20 14:50:11"} |
2022-05-20 14:50:15 | 5c7b5efa1d9940e0a72ab06e3d0f9b5b | shardId-000000000000 | 49629688051866322531082401886414876226617779544043552770 | {"user":2,"heartrate":137,"power":186,"cadence":90,"timestamp":"2022-05-20 14:50:14"} |
2022-05-20 14:50:17 | 083b690087414ce19d48eb9aefa048fd | shardId-000000000000 | 49629688051866322531082401886419711929896238266900807682 | {"user":2,"heartrate":129,"power":175,"cadence":85,"timestamp":"2022-05-20 14:50:16"}% |
Kinesis Data Streamsのデータを取得できています。
パーティションキー(partitionkey
)やシャードID(shardid
)などのKinesisストリームの付加情報はデバッグなどで活用しましょう。
7. 複雑なマテリアライズド・ビューの定義
Kinesis には JSON 形式のレコードが送信されています。 属性ごとに展開したマテリアライズド・ビューを定義します。
CREATE MATERIALIZED VIEW view_foo_extract DISTKEY(2) sortkey(1) AS SELECT approximatearrivaltimestamp, json_extract_path_text(from_varbyte(data, 'utf-8'), 'user') :: INT as user_id, json_extract_path_text(from_varbyte(data, 'utf-8'), 'timestamp') :: varchar(20) as generated_time, json_extract_path_text(from_varbyte(data, 'utf-8'), 'power') :: INT as power, json_extract_path_text(from_varbyte(data, 'utf-8'), 'cadence') :: INT as cadence, json_extract_path_text(from_varbyte(data, 'utf-8'), 'heartrate') :: INT as heartrate FROM schema_one.foo;
このビューでは、さらに以下を行っています。
- ユーザーID(
user_id
)でデータ分散 - Kinesisにレコード送信された時刻(
approximatearrivaltimestamp
)でソート
先程と同じく、ビューをリフレッシュし、レコードを確認します。
REFRESH MATERIALIZED VIEW view_foo_extract select * from view_foo_extract limit 5
approximatearrivaltimestamp | user_id | generated_time | power | cadence | heartrate |
---|---|---|---|---|---|
2022-05-20 14:50:10 | 2 | 2022-05-20 14:50:09 | 194 | 93 | 143 |
2022-05-20 14:50:11 | 2 | 2022-05-20 14:50:10 | 191 | 92 | 141 |
2022-05-20 14:50:11 | 5 | 2022-05-20 14:50:11 | 227 | 97 | 150 |
2022-05-20 14:50:13 | 2 | 2022-05-20 14:50:12 | 191 | 92 | 141 |
2022-05-20 14:50:13 | 5 | 2022-05-20 14:50:13 | 221 | 94 | 146 |
過去3分のレコードを対象に ユーザx分単位でデータをサマってみましょう。
select user_id, to_timestamp(generated_time, 'YYYY-MM-DD HH24:MI') as minute, avg(power) from view_foo_extract where approximatearrivaltimestamp > current_timestamp - interval '3 minutes' group by user_id, to_timestamp(generated_time, 'YYYY-MM-DD HH24:MI') order by 1, 2
user_id | minute | avg |
---|---|---|
1 | 2022-05-20 17:20:00+00 | 461 |
1 | 2022-05-20 17:21:00+00 | 371 |
1 | 2022-05-20 17:22:00+00 | 424 |
2 | 2022-05-20 17:20:00+00 | 110 |
2 | 2022-05-20 17:21:00+00 | 98 |
2 | 2022-05-20 17:22:00+00 | 64 |
5 | 2022-05-20 17:20:00+00 | 257 |
5 | 2022-05-20 17:21:00+00 | 241 |
5 | 2022-05-20 17:22:00+00 | 260 |
8. ニアリアルタイム処理の確認
ニアリアルタイムに処理できていることを確認するために、マテリアライズド・ビューをリフレッシュ後、過去3分を対象に、Kinesisにレコード送信された時刻(approximatearrivaltimestamp
)の最小、最大値を確認します。
SELECT current_timestamp, min(approximatearrivaltimestamp), max(approximatearrivaltimestamp) FROM view_foo WHERE approximatearrivaltimestamp > current_timestamp - interval '3 minutes'
timestamptz | min | max |
---|---|---|
2022-05-20 17:16:58.891641+00 | 2022-05-20 17:13:59 | 2022-05-20 17:16:49 |
- 現在時刻 17:16:58.891641 に対して、最新のレコードは9秒前の 17:16:49
- 3分前は 17:13:58.891641 に対して、3分以内の最古のレコードはほぼ同時刻の 17:13:59
とニアリアルタイムに処理されています。
ストリームデータはビューをリフレッシュしないと更新されないため、クエリーのスケジュール機能を利用し、マテリアライズド・ビューのリフレッシュを定期実行しましょう。
アーキテクチャー
AWSの機能紹介動画で、アーキテクチャーが紹介されていました。
永続的なデータ向けテーブル(permanent table)とストリームデータ用テーブル(Streaming Table)が別れていますね。
さらに、ストリーム用テーブルには Kinesis Data Streams だけでなく、"Amazon Managed Kafka Service ... others"の文字も見えます。
Redshiftがストリームデータに対応し、その第一弾として Kinesis Data Streams 対応が発表されたとみなせ、今後も続々とストリーム対応が進みそうです。
まとめ
Amazon RedshiftからAmazon Kinesis Data Streamのストリームデータをマテリアライズド・ビュー経由でニアリアルタイムにインジェストする機能がプレビュー提供されています。
データパイプラインの観点からは、Firehose・S3を挟むことで構成が複雑になっていたり、データ処理までのレイテンシーが発生するといった課題が解消されます。
フェデレーテッド・クエリの観点からは、Redshiftが新たにストリームデータにも対応したとみなせます。
本機能はKinesis Data StreamsとシームレスにSQL連携できるため、将来的にはKinesis Data Analyticsが担っていた処理の一部もRedshiftへの移行が進むでしょう。
繰り返しとなりますが、本機能はパブリックプレビューで提供されているため、正式版のリリースまでに機能や仕様は変更される可能性があります。 あくまでも検証目的でご利用ください。